// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package client

import (
	"bytes"
	"encoding/binary"
	"errors"
	"gitee.com/zhongguo168a/go-nodex/nodex/server/message"
	"gitee.com/zhongguo168a/gocodes/myx/errorx"
	"io"
	"log"
	"net"
	"sync"
)

var (
	debugLog bool
)

// ServerError represents an error that has been returned from
// the remote side of the RPC connection.
type ServerError string

func (e ServerError) Error() string {
	return string(e)
}

var ErrShutdown = errors.New("connection is shut down")

// Call represents an active RPC.
type Call struct {
	Route string      // The name of the service and method to call.
	Args  interface{} // The argument to the function (*struct).
	Reply interface{} // The reply from the function (*struct).
	Error error       // After completion, the error status.
	Done  chan *Call  // Receives *Call when Go is complete.

}

// NewClientByConn returns a new Client to handle requests to the
// set of services at the other end of the connection.
// It adds a buffer to the write side of the connection so
// the header and payload are sent as a unit.
//
// The read and write halves of the connection are serialized independently,
// so no interlocking is required. However each half may be accessed
// concurrently so the implementation of conn should protect against
// concurrent reads or concurrent writes.
func NewClientByConn(address string, conn net.Conn) *Client {
	client := &Client{
		address: address,
		conn:    conn,
		encBuf:  bytes.NewBuffer([]byte{}),
		pending: make(map[uint32]*Call),
	}
	go client.input()
	return client
}

func NewClient() (obj *Client) {
	obj = &Client{
		encBuf:  bytes.NewBuffer([]byte{}),
		pending: make(map[uint32]*Call),
	}
	return
}

// Client represents an RPC Client.
// There may be multiple outstanding Calls associated
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
type Client struct {
	reqMutex sync.Mutex // protects following
	request  *message.RequestMessage

	mutex    sync.Mutex // protects following
	seq      uint32
	pending  map[uint32]*Call
	closing  bool // user has called Close
	shutdown bool // server has told us to stop

	address string
	conn    net.Conn
	encBuf  *bytes.Buffer

	token string
}

// WithToken Close 的时候会自动清空
func (client *Client) WithToken(token string) *Client {
	client.token = token
	return client
}

func (client *Client) send(call *Call) {

	client.reqMutex.Lock()
	defer client.reqMutex.Unlock()

	// Register this call.
	client.mutex.Lock()
	if client.shutdown || client.closing {
		client.mutex.Unlock()
		call.Error = ErrShutdown
		call.done()
		return
	}
	seq := client.seq
	client.seq++
	client.pending[seq] = call
	client.mutex.Unlock()

	// Encode and send the request.
	client.request = &message.RequestMessage{}
	client.request.Seq = seq
	client.request.Route = call.Route
	client.request.Token = client.token

	buff := bytes.NewBuffer([]byte{})
	if err := message.Pack(buff, client.request.SerializeMode, call.Args); err != nil {
		call.Error = errorx.Wrap(err, "pack args")
		call.done()
		return
	}

	client.request.Data = buff.Bytes()
	//client.request.Route = call.Route

	if err := client.request.Pack(client.encBuf); err != nil {
		call.Error = errorx.Wrap(err, "pack request")
		call.done()
		return
	}

	_, err := client.conn.Write(client.encBuf.Bytes())
	if err != nil {
		client.mutex.Lock()
		call = client.pending[seq]
		delete(client.pending, seq)
		client.mutex.Unlock()
		if call != nil {
			call.Error = err
			call.done()
		}
		_ = client.close()
	}
	client.encBuf.Reset()
}

func (client *Client) input() {
	var err error
	var n int
	var response message.ResponseMessage
	// 消息长度
	length := 0
	// 消息长度uint32
	ulength := uint16(0)
	// 消息缓冲
	msgbuf := bytes.NewBuffer(make([]byte, 0, 10240))
	// 数据缓冲
	databuf := make([]byte, 4096)
	for err == nil {
		n, err = client.conn.Read(databuf)
		if err == io.EOF {
		}
		if err != nil {
			err = errorx.Wrap(err, "read connection")
			continue
		}
		// 数据添加到消息缓冲
		n, err = msgbuf.Write(databuf[:n])
		if err != nil {
			err = errorx.Wrap(err, "msgbuf.Write")
			continue
		}

		// 消息分割循环
		for {
			// 消息头
			if length == 0 && msgbuf.Len() >= 2 { // uint16
				_ = binary.Read(msgbuf, binary.LittleEndian, &ulength)
				length = int(ulength)
				// 检查超长消息
				if length > 10240 {
					err = errorx.Wrap(err, "package length too long")
					continue
				}
			}
			// 消息体
			if length > 0 && msgbuf.Len() >= length {
				data := msgbuf.Next(length)
				reader := bytes.NewReader(data)
				var tag int8
				_ = binary.Read(reader, binary.LittleEndian, &tag)
				if message.PushTag(tag) != message.PushTag_Response {
					continue
				}
				response = message.ResponseMessage{}
				err = response.Unpack(reader)
				if err != nil {
					break
				}
				seq := response.Seq
				client.mutex.Lock()
				call := client.pending[seq]
				delete(client.pending, seq)
				client.mutex.Unlock()

				switch {
				case call == nil:
					// We've got no pending call. That usually means that
					// WriteRequest partially failed, and call was already
					// removed; response is a server telling us about an
					// error reading request body. We should still attempt
					// to read error body, but there's no one to give it to.

				case response.Error != "":
					// We've got an error response. Give this to the request;
					// any subsequent requests will get the ReadResponseBody
					// error if there is one.
					call.Error = ServerError(response.Error)
					call.done()
				default:

					err = message.Unpack(response.Data, response.SerializeMode, call.Reply)
					if err != nil {
						call.Error = errorx.Wrap(err, "message.Unpack")
					}
					call.done()
				}

				length = 0
			} else {
				break
			}
		}

	}
	// Terminate pending calls.
	client.reqMutex.Lock()
	client.mutex.Lock()
	client.shutdown = true
	closing := client.closing
	if err == io.EOF {
		if closing {
			err = ErrShutdown
		} else {
			err = io.ErrUnexpectedEOF
		}
	}
	for _, call := range client.pending {
		call.Error = err
		call.done()
	}
	client.mutex.Unlock()
	client.reqMutex.Unlock()
	if debugLog && err != io.EOF && !closing {
		log.Println("rpc: client protocol error:", err)
	}
}

func (call *Call) done() {
	select {
	case call.Done <- call:
		// ok
	default:
		// We don't want to block here. It is the caller's responsibility to make
		// sure the channel has enough buffer space. See comment in Go().
		if debugLog {
			log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
		}
	}
}

func (client *Client) Close() {
	if client.closing || client.shutdown {
		return
	}
	// 关闭时，如果链接已经关闭，则不返回池中
	putClient(client)
}

// Close calls the underlying codec's Close method. If the connection is already
// shutting down, ErrShutdown is returned.
func (client *Client) close() error {
	client.mutex.Lock()
	if client.closing {
		client.mutex.Unlock()
		return ErrShutdown
	}
	client.closing = true
	client.mutex.Unlock()
	return client.conn.Close()
}

// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(route string, args interface{}, reply interface{}, done chan *Call) *Call {
	call := new(Call)
	call.Route = route
	call.Args = args
	call.Reply = reply
	if done == nil {
		done = make(chan *Call, 10) // buffered.
	} else {
		// If caller passes done != nil, it must arrange that
		// done has enough buffer for the number of simultaneous
		// RPCs that will be using that channel. If the channel
		// is totally unbuffered, it's best not to run at all.
		if cap(done) == 0 {
			log.Panic("rpc: done channel is unbuffered")
		}
	}
	call.Done = done
	client.send(call)
	return call
}

// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(route string, args interface{}, reply interface{}) error {
	call := <-client.Go(route, args, reply, make(chan *Call, 1)).Done
	return call.Error
}
